跳到主要内容

Go 调用 kafka

这里使用的客户端是 kafka-go

import "github.com/segmentio/kafka-go"

发送消息

使用 kafka.Writer 向 Kafka 服务器发送消息。

创建 Writer

w := kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
WriteTimeout: writeTimeout,
}

其中:

  • brokers 是 Kafka 服务地址列表
  • topic 是主题
  • writeTimeout 是超时时间

发送消息,如果发送失败,err 不为空

err := w.WriteMessages(
context.Background(),
kafka.Message{
Value: message,
},
)

消息发送完毕后,手动关闭 Writer

w.Close()

接收消息

使用 kafka.Reader 从 Kafka 服务器接收消息。

r = kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupId,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})

其中 groupId 是 consumer group 名称,每条消息只会向同一个 group 中某个 consumer 发送一次。

因为接收消息的程序要持续运行,所以使用 defer 语句执行关闭操作

defer r.Close()

使用无限 for 循环读取并打印每条消息

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
log.Printf("%s", string(m.Value))
}

运行流程如下图所示: